



  import java.text.SimpleDateFormat

  import org.apache.flink.api.java.tuple.Tuple
  import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  import org.apache.flink.streaming.api.scala._
  import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, TumblingProcessingTimeWindows}
  import org.apache.flink.streaming.api.windowing.time.Time
  import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  import org.apache.flink.util.Collector
  /**
    * @description: ${description}
    * @author: Liu Jun Jun
    * @create: 2020-06-29 13:59
    **/

  object ProcessingTime {
    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment

      val dataDS = env.socketTextStream("Desktop", 3456)

      val tsDS = dataDS.map(str => {
        val strings = str.split(",")
        (strings(0), strings(1).toLong, 1)
      }).keyBy(0)
        //窗口大小为5s的滚动窗口
        //.timeWindow(Time.seconds(5))和下面的这种写法都是可以的
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
        .apply {
          (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
            out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
          }
        }.print("windows:>>>")

      env.execute()
    }
  }
